6  Combining Updated Locations Data with Initial Trip Data

6.1 Introduction

If you recall, we had initially separated location and non-location data so as to gather more location-related data, such as median household income and population density around pickup and dropoff points.

Having accomplished that, we will now combine the updated locations data with the rest of the trip data.

This will be the last chapter for me, but please go further — create maps based on this data, and perform machine learning analysis too. You can find Sparklyr machine learning documentation here.

It has been a joy to come this far, and I hope you have learnt something new throughout this whole tutorial.

FYI, because we are not using Apache Sedona in this part, we are using the same Spark configuration as that used in Chapter One.

Anyhow, let us get to work!

# Load required libraries
library(sparklyr)   # Spark connection and data manipulation
library(dplyr)      # Data manipulation functions

# Install and set up Spark environment
spark_install("3.5.5")  # Install the specific version of Spark (3.5.5)

# Set Java and Spark home directory paths
Sys.setenv("JAVA_HOME"="/Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home")  # Set Java home directory for Spark
Sys.setenv("SPARK_HOME"=spark_home_dir(version = "3.5.5"))  # Set Spark home directory

# Define working directory path for file management
working_dir <- "/Users/rodgersiradukunda/Library/CloudStorage/OneDrive-TheUniversityofLiverpool/geospatial_docker"
setwd(working_dir)  # Set the working directory

# Define path for Spark data
spark_dir <- file.path(getwd(), "data", "spark")

# Create an empty list for Spark configuration settings
config <- list()

# Set Spark configurations for memory and performance optimisation
config$`sparklyr.shell.driver-java-options` <- paste0("-Djava.io.tmpdir=", spark_dir)  # Set temporary directory for Spark
config$`sparklyr.shell.driver-java-options` <- "-XX:+UseCompressedOops"  # Use compressed Oops for JVM performance
config$`sparklyr.shell.driver-memory` <- '10G'  # Allocate 8GB of memory for the Spark driver
config$spark.memory.fraction <- 0.7  # Set fraction of heap memory used for Spark storage
config$spark.sql.shuffle.partitions.local <- 24  # Set shuffle partitions (local setting based on workload)
config$spark.driver.extraJavaOptions <- "-Xmx8G"  # Set extra memory for driver
config$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"  # Use KryoSerializer for better performance
config$spark.memory.offHeap.enabled <- "true"  # Enable off-heap memory usage
config$spark.memory.offHeap.size <- "4g"  # Set 4GB for off-heap memory
config$spark.sql.shuffle.spill <- "false"  # Disable shuffle spill to disk
config$spark.cleaner.periodicGC.interval <- "60s"  # Periodic garbage collection interval
config$spark.sql.files.maxPartitionBytes <- "200m"  # Set max partition size for shuffle files
config$spark.sql.adaptive.enabled <- "true"  # Enable adaptive query execution

# Connect to Spark with the specified configurations
sc <- spark_connect(
  master = "local[*]",  # Use all available cores for local execution
  config = config,      # Use the specified configurations
  packages = "delta"    # Install the Delta Lake package for optimised storage
)

# Open Spark web UI for monitoring the connection
spark_web(sc)

6.2 Load the datasets

We start by loading the datasets.

As you can see below, in addition to partitioning the data into 24 parts, we also specify the column to partition by, and use trip_id which is common to both datasets.

This is because we shall later join the datasets based on this column, and we want rows with the same trip_id to be in the same partitions so as to minimise shuffling of data, which is quite computationally intensive.

# Read locations dataset in Delta format and register as a SQL view for querying
locations_sdf_updated_three <- spark_read_delta(
  sc,
  path = file.path(getwd(), "data", "locations_sdf_updated_three")
) |> 
  filter(trip_id > 40000000) %>%  # Optional filtering
  sdf_repartition(partitions = 24, partition_by = "trip_id") %>%  # Repartition data for efficiency
  sdf_register("locations_sdf_updated_three_view")  # Register as view for SQL queries

# Read trip data in Delta format and register as a SQL view
trip_data_sdf <- spark_read_delta(
  sc,
  path = file.path(getwd(), "data", "trip_data_sdf")
) %>% 
  filter(trip_id > 40000000) %>%  # Optional filtering
  sdf_repartition(partitions = 24, partition_by = "trip_id") %>%  # Repartition for optimal join performance
  sdf_register("trip_data_sdf")

Just a refresher on how these datasets look.

print(locations_sdf_updated_three, width=Inf)
print(trip_data_sdf, width=Inf)

6.3 Joining the datasets

We start by joining the non-location data with the pickup location data, renaming certain variables, and dropping others in the process.

# Join trip data with pickup locations using trip_id and rename selected columns for clarity
merged_one <- left_join(
  trip_data_sdf,
  locations_sdf_updated_three %>% filter(is_pickup == 1),
  by = "trip_id"
) %>% 
  rename(
    pickup_borough = BoroName,
    pickup_neighbourhood = NTAName, 
    pickup_neigh_hhincome = MdHHIncE,
    pickup_pop_density = pop_density,
    pickup_lcz_label = lcz_label
  ) %>% 
  select(
    trip_id,
    VendorID, 
    tpep_pickup_datetime, 
    tpep_dropoff_datetime, 
    passenger_count,
    trip_distance,
    pickup_hour,
    pickup_dayofweek,
    pickup_week,
    pickup_month,
    dropoff_hour,
    dropoff_dayofweek,
    dropoff_week,
    dropoff_month,
    is_weekend_pickup,
    is_weekend_dropoff,
    is_rush_hour_pickup,
    trip_distance_scaled,
    pickup_borough,
    pickup_neighbourhood,
    pickup_neigh_hhincome,
    pickup_pop_density,
    pickup_lcz_label
  )

# Display the merged pickup-enriched dataset
print(merged_one, width = Inf)

We now join our updated data with dropoff location data, again renaming certain variables and dropping others.

# Join the merged pickup dataset with dropoff locations and rename/drop unnecessary columns
merged_two <- left_join(
  merged_one,
  locations_sdf_updated_three %>% filter(is_pickup == 0),
  by = "trip_id"
) %>% 
  rename(
    dropoff_borough = BoroName,
    dropoff_neighbourhood = NTAName, 
    dropoff_neigh_hhincome = MdHHIncE,
    dropoff_pop_density = pop_density,
    dropoff_lcz_label = lcz_label
  ) %>% 
  select(-c(latitude, longitude, is_pickup, lcz_class))  # Drop redundant columns

This is what our final dataset looks like.

# Print the final merged dataset enriched with both pickup and dropoff spatial context
print(merged_two, width = Inf)

Finally, we write the data to disk. As stated earlier, feel free to go beyond this tutorial, creating maps based on NYC neighbourhoods and building models. Good luck, and thank you!

# Define output path to save final merged dataset in Delta format
locations_sdf_updated_four_file_path <- file.path(
  getwd(),
  "data",
  "locations_sdf_updated_four"
)

# Write final dataset in Delta format with append mode to allow incremental writing
spark_write_delta(
  merged_two,
  path = locations_sdf_updated_four_file_path,
  mode = "append"
)
# Disconnect from the Spark session to free resources
spark_disconnect(sc)